d655e7868f8521d726cf6a854c13a41a61341a1a,src/java/org/apache/cassandra/service/StorageProxy.java,StorageProxy,sendMessages,#String#Map#IWriteResponseHandler#,296
Before Change
Message message = messages.getKey();
// a single message object is used for unhinted writes, so clean out any forwards
// from previous loop iterations
message.removeHeader(RowMutation.FORWARD_HEADER);
if (dataCenter.equals(localDataCenter))
{
// direct writes to local DC or old Cassadra versions
for (InetAddress destination : messages.getValue())
MessagingService.instance().sendRR(message, destination, handler);
}
else
{
// Non-local DC. First endpoint in list is the destination for this group
Iterator<InetAddress> iter = messages.getValue().iterator();
InetAddress target = iter.next();
// Add all the other destinations of the same message as a header in the primary message.
while (iter.hasNext())
{
InetAddress destination = iter.next();
// group all nodes in this DC as forward headers on the primary message
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
// append to older addresses
byte[] previousHints = message.getHeader(RowMutation.FORWARD_HEADER);
if (previousHints != null)
dos.write(previousHints);
dos.write(destination.getAddress());
message.setHeader(RowMutation.FORWARD_HEADER, bos.toByteArray());
}
// send the combined message + forward headers
MessagingService.instance().sendRR(message, target, handler);
After Change
Message message = messages.getKey();
// a single message object is used for unhinted writes, so clean out any forwards
// from previous loop iterations
message = message.withHeaderRemoved(RowMutation.FORWARD_HEADER);
if (dataCenter.equals(localDataCenter))
{
// direct writes to local DC or old Cassadra versions
for (InetAddress destination : messages.getValue())
MessagingService.instance().sendRR(message, destination, handler);
}
else
{
// Non-local DC. First endpoint in list is the destination for this group
Iterator<InetAddress> iter = messages.getValue().iterator();
InetAddress target = iter.next();
// Add all the other destinations of the same message as a header in the primary message.
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
while (iter.hasNext())
{
InetAddress destination = iter.next();
dos.write(destination.getAddress());
}
message = message.withHeaderAdded(RowMutation.FORWARD_HEADER, bos.toByteArray());
// send the combined message + forward headers
MessagingService.instance().sendRR(message, target, handler);
}